14f4d6f2b172d4a6fc0f5fd27f90cd3f9cd7d69f,test-framework/PurchaseAnalytics/src/main/java/com/continuuity/testsuite/purchaseanalytics/appdriver/PurchaseAnalyticsAppDriver.java,PurchaseAnalyticsAppDriver,main,#String[]#,51

Before Change


      // Run map reduce jobs.
      appDriver.startComponent("PurchaseHistoryBuilder", "mapreduce");
      appDriver.startComponent("PurchaseStatsBuilder", "mapreduce");
      appDriver.startComponent("RegionBuilder", "mapreduce");

      // TODO: Wait for status on MR jobs, correlate result from MR job

After Change


      appDriver.startComponent("PurchaseAnalyticsFlow", "flow");

      // check that flow started properly
      if (appDriver.checkComponentStatus("PurchaseAnalyticsFlow", "flow") != "RUNNING") {
        LOG.error(String.format("Unable to start PurchaseAnalytics flow."));
        return;
      }

      // Randomly generates n number of transactions.
      appDriver.generateTransactions();

      Customer returnedCustomer = null;
      // Verify that all customers streamed have been inserted
      for (Customer customer : appDriver.helper.getCustomers()) {

        if (!appDriver.customerExists(customer)) {
           LOG.error("Customer not found: " + gson.toJson(customer));
        }
      }

      for (Product product : appDriver.helper.getProducts()) {
        if (!appDriver.productExists(product)) {
          LOG.error("Customer not found: " + gson.toJson(product));
        }
      }

      try {

        // Run map reduce jobs.
        appDriver.startComponent("PurchaseHistoryBuilder", "mapreduce");

        // pool for for completion
        while (appDriver.checkComponentStatus("PurchaseHistoryBuilder", "mapreduce") == "RUNNING") {
          Thread.sleep(500);
        }

        appDriver.startComponent("PurchaseStatsBuilder", "mapreduce");

        // pool for for completion
        while (appDriver.checkComponentStatus("PurchaseStatsBuilder", "mapreduce") == "RUNNING") {
          Thread.sleep(500);
        }

        appDriver.startComponent("RegionBuilder", "mapreduce");

        // pool for for completion
        while (appDriver.checkComponentStatus("PurchaseStatsBuilder", "mapreduce") == "RUNNING") {
          Thread.sleep(500);
        }
      } catch (InterruptedException ex) {
        LOG.error(ex.getLocalizedMessage());
      }